Skip to main content

Python for Vector Search

Before reading any explanation, predict the output:

import numpy as np

a = np.array([1.0, 0.0, 0.0])
b = np.array([0.707, 0.707, 0.0])
c = np.array([-1.0, 0.0, 0.0])

def cosine_similarity(v1, v2):
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

print(f"sim(a, a) = {cosine_similarity(a, a):.3f}")
print(f"sim(a, b) = {cosine_similarity(a, b):.3f}")
print(f"sim(a, c) = {cosine_similarity(a, c):.3f}")

# Now with unnormalised vectors:
d = np.array([100.0, 0.0, 0.0]) # same direction as a, 100x longer
print(f"sim(a, d) = {cosine_similarity(a, d):.3f}")

Write your predictions. Then continue.

# Output:
# sim(a, a) = 1.000 -- identical vectors: perfect similarity
# sim(a, b) = 0.707 -- 45 degrees apart: cos(45) = 0.707
# sim(a, c) = -1.000 -- opposite directions: minimum similarity
# sim(a, d) = 1.000 -- same direction, different magnitude: still 1.0

sim(a, d) = 1.000 is the key insight. A vector 100 times longer than another is just as similar if it points in the same direction. Cosine similarity measures angle, not magnitude. This is why text embedding models output unit-normalised vectors -- the magnitude carries no semantic information. The meaning is in the direction.

Vector search is how you make LLMs useful against your own data. The context window is finite; your knowledge base is not. Vector search is the bridge: encode documents as points in high-dimensional space, encode the user's query the same way, then find the nearest documents by angle. Those are the semantically similar documents. Feed them to the LLM as context. That is RAG.

What You Will Learn

  • What embeddings are and why semantic similarity maps to vector proximity
  • Generating embeddings with OpenAI, Sentence Transformers, and Cohere
  • Cosine similarity vs dot product vs L2: when to use each
  • FAISS for fast in-memory vector search (exact and approximate)
  • ChromaDB for persistent local vector search with metadata filtering
  • Pinecone for production cloud vector search (upsert, query, namespaces)
  • pgvector for Postgres-native vector search
  • Building a complete RAG pipeline: chunk, embed, store, retrieve, generate
  • Chunking strategies: fixed, sentence, semantic, recursive
  • Reranking with cross-encoders
  • Hybrid search: BM25 + vector + reciprocal rank fusion
  • Production considerations: embedding drift, index updates, metadata at scale

Prerequisites

  • Familiarity with NumPy arrays and matrix operations
  • Basic understanding of LLM API calls (Lessons 1-4)
  • Python dataclasses and type hints

Part 1 -- What Embeddings Are and Why They Work

The Geometric Intuition

An embedding model maps text into a fixed-size vector in a high-dimensional space (typically 384 to 3072 dimensions). The key property: semantically similar texts are mapped to nearby points.

"The dog chased the cat." --> [0.23, -0.15, 0.87, ...]
"A cat was pursued by a dog." --> [0.21, -0.14, 0.88, ...] <-- similar
"Python is a programming language"--> [-0.41, 0.92, -0.33, ...] <-- far away
"The stock market fell today." --> [-0.12, 0.31, -0.67, ...] <-- also far

The first two sentences have different syntax but the same meaning -- their embeddings are close. The other two sentences are on entirely different topics -- their embeddings are far away.

This works because embedding models are trained to predict which texts are semantically related (contrastive learning). The training signal is: texts that appear together in similar contexts should have nearby embeddings; unrelated texts should be far apart.

Why Cosine Similarity?

When you query a vector database, you need a distance (or similarity) metric. Three choices:

MetricFormulaUse when
Cosine similarity`dot(a,b) / (a
Dot productsum(a_i * b_i)Vectors ARE unit-normalised (then dot product = cosine similarity); fastest operation
L2 (Euclidean)sqrt(sum((a_i - b_i)^2))You care about magnitude (e.g., document importance scores embedded in magnitude)

Most embedding models output unit-normalised vectors. In that case, dot product and cosine similarity are identical (and dot product is faster). Check your embedding model's documentation.

import numpy as np


def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity: measures angle between vectors.

Returns 1.0 for identical direction, 0.0 for orthogonal, -1.0 for opposite.
Independent of vector magnitude.
"""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))


def dot_product_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Dot product: works as similarity metric only for unit-normalised vectors.

For unit vectors: dot(a,b) == cosine_similarity(a,b)
For non-unit vectors: dot product favours longer vectors.
"""
return float(np.dot(a, b))


def l2_distance(a: np.ndarray, b: np.ndarray) -> float:
"""L2 (Euclidean) distance: lower = more similar.

Unlike cosine similarity, this is sensitive to magnitude.
Convert to similarity: sim = 1 / (1 + distance)
"""
return float(np.linalg.norm(a - b))


def normalise(v: np.ndarray) -> np.ndarray:
"""L2-normalise a vector so it has unit magnitude."""
norm = np.linalg.norm(v)
return v / norm if norm > 0 else v


# If vectors are already unit-normalised (as returned by most embedding APIs),
# dot product and cosine similarity are equivalent -- use dot product for speed
a = normalise(np.array([3.0, 4.0, 0.0]))
b = normalise(np.array([1.0, 1.0, 0.0]))

print(f"cosine_similarity: {cosine_similarity(a, b):.4f}")
print(f"dot_product: {dot_product_similarity(a, b):.4f}")
# Both print the same value because a and b are unit vectors

Part 2 -- Generating Embeddings

OpenAI text-embedding-3

import openai
import numpy as np
from typing import Sequence

client = openai.OpenAI()


def embed_openai(
texts: str | list[str],
model: str = "text-embedding-3-small",
) -> np.ndarray:
"""Generate embeddings using the OpenAI embedding API.

text-embedding-3-small: 1536 dimensions, $0.020/million tokens
text-embedding-3-large: 3072 dimensions, $0.130/million tokens

For most RAG applications, text-embedding-3-small is sufficient.
Use large only if you measure a meaningful recall improvement.

Args:
texts: Single string or list of strings.
model: Embedding model name.

Returns:
2D numpy array of shape (n_texts, embedding_dim).
"""
if isinstance(texts, str):
texts = [texts]

# The API accepts up to 2048 texts per request
# For large batches, chunk and make multiple calls
response = client.embeddings.create(
input=texts,
model=model,
)

# response.data is a list of Embedding objects, sorted by index
vectors = np.array([item.embedding for item in response.data], dtype=np.float32)
return vectors


# Test: semantically similar texts should have higher similarity
texts = [
"Python is a programming language.",
"Python is used for software development.",
"The Amazon river is in South America.",
]

embeddings = embed_openai(texts)
print(f"Shape: {embeddings.shape}") # (3, 1536)

sim_01 = cosine_similarity(embeddings[0], embeddings[1])
sim_02 = cosine_similarity(embeddings[0], embeddings[2])
print(f"sim('Python programming', 'Python development'): {sim_01:.3f}") # High
print(f"sim('Python programming', 'Amazon river'): {sim_02:.3f}") # Low

Sentence Transformers (Local, Free)

from sentence_transformers import SentenceTransformer
import numpy as np


def build_local_embedder(model_name: str = "all-MiniLM-L6-v2"):
"""Build a local embedding function using Sentence Transformers.

Popular models:
- all-MiniLM-L6-v2: 384 dims, fast, ~80MB, good for English
- all-mpnet-base-v2: 768 dims, better quality, ~420MB
- paraphrase-multilingual-MiniLM-L12-v2: 384 dims, 50+ languages
- BAAI/bge-large-en-v1.5: 1024 dims, state-of-the-art English retrieval

Downloads model on first call, then caches locally.
"""
model = SentenceTransformer(model_name)

def embed(texts: str | list[str]) -> np.ndarray:
if isinstance(texts, str):
texts = [texts]
# encode() returns a 2D numpy array, normalised by default
return model.encode(
texts,
convert_to_numpy=True,
normalize_embeddings=True, # Unit normalise for dot product search
show_progress_bar=len(texts) > 100,
).astype(np.float32)

embed.model_name = model_name
embed.dim = model.get_sentence_embedding_dimension()
return embed


# Usage
embed = build_local_embedder("all-MiniLM-L6-v2")
vecs = embed(["Hello world", "Hi there"])
print(f"Shape: {vecs.shape}") # (2, 384)
print(f"Norm: {np.linalg.norm(vecs[0]):.4f}") # ~1.0 (unit normalised)

Batch Embedding with Rate Limiting

import time
from typing import Callable
import math


def embed_in_batches(
texts: list[str],
embed_fn: Callable[[list[str]], np.ndarray],
batch_size: int = 100,
requests_per_minute: int = 500, # API rate limit
show_progress: bool = True,
) -> np.ndarray:
"""Embed a large list of texts in batches, respecting rate limits.

Args:
texts: All texts to embed.
embed_fn: Embedding function that accepts a list of strings.
batch_size: Number of texts per API call.
requests_per_minute: API rate limit in requests per minute.
show_progress: Print progress every 10 batches.

Returns:
2D numpy array of shape (len(texts), embedding_dim).
"""
all_embeddings = []
n_batches = math.ceil(len(texts) / batch_size)
min_seconds_per_request = 60.0 / requests_per_minute

for i in range(0, len(texts), batch_size):
batch_start = time.monotonic()
batch = texts[i: i + batch_size]
batch_embeddings = embed_fn(batch)
all_embeddings.append(batch_embeddings)

batch_num = i // batch_size + 1
if show_progress and batch_num % 10 == 0:
print(f"Embedded batch {batch_num}/{n_batches} ({i + len(batch):,}/{len(texts):,} texts)")

# Rate limiting: sleep for the remainder of the minimum request interval
elapsed = time.monotonic() - batch_start
sleep_time = min_seconds_per_request - elapsed
if sleep_time > 0 and i + batch_size < len(texts):
time.sleep(sleep_time)

return np.vstack(all_embeddings)

FAISS (Facebook AI Similarity Search) is the standard library for fast in-memory vector search. It runs on CPU and GPU, supports exact and approximate nearest neighbour search, and handles billions of vectors.

import faiss
import numpy as np
from dataclasses import dataclass


@dataclass
class SearchResult:
"""A single search result from a vector query."""
index: int # Position in the original document list
distance: float # Lower distance = more similar
score: float # Higher score = more similar (1 - distance for cosine)


class FAISSIndex:
"""Wrapper around FAISS for semantic search over text documents.

Supports two index types:
- IndexFlatIP: Exact inner product search. Use for unit-normalised vectors.
Brute-force O(n*d) -- slow for large n but always returns exact results.
- IndexIVFFlat: Approximate search using inverted file index.
Faster for large collections but may miss some nearest neighbours.
"""

def __init__(
self,
dim: int,
approximate: bool = False,
n_clusters: int = 100, # For IVFFlat only
):
"""
Args:
dim: Embedding dimension.
approximate: Use IVFFlat approximate search (faster for >100K vectors).
n_clusters: Number of Voronoi cells for IVFFlat clustering.
"""
self.dim = dim
self._documents: list[str] = []

if approximate:
# IVFFlat: Approximate nearest neighbour search.
# Requires training on a sample before use.
quantiser = faiss.IndexFlatIP(dim) # Quantiser for cluster centroids
self._index = faiss.IndexIVFFlat(quantiser, dim, n_clusters, faiss.METRIC_INNER_PRODUCT)
self._trained = False
else:
# FlatIP: Exact inner product (= cosine similarity for unit vectors).
# No training required. Use for collections up to ~100K documents.
self._index = faiss.IndexFlatIP(dim)
self._trained = True # Flat index needs no training

def train(self, training_vectors: np.ndarray) -> None:
"""Train the approximate index on a sample of vectors.

Required only for approximate (IVFFlat) indices. The training vectors
should be representative of the full collection. Typically use 10-50x
the number of clusters as training samples.
"""
if not isinstance(self._index, faiss.IndexIVFFlat):
return # Flat index needs no training
self._index.train(training_vectors.astype(np.float32))
self._trained = True

def add(self, documents: list[str], embeddings: np.ndarray) -> None:
"""Add documents and their embeddings to the index.

Args:
documents: List of text strings (for retrieval).
embeddings: 2D array of shape (n_docs, dim), unit-normalised.
"""
if not self._trained:
raise RuntimeError(
"Index must be trained before adding vectors. "
"Call train() with a representative sample first."
)

embeddings = embeddings.astype(np.float32)

# Verify unit normalisation (cosine similarity requires this)
norms = np.linalg.norm(embeddings, axis=1)
if not np.allclose(norms, 1.0, atol=0.01):
# Auto-normalise with a warning -- in production, normalise upstream
print("Warning: embeddings are not unit-normalised. Auto-normalising.")
embeddings = embeddings / norms[:, np.newaxis]

self._index.add(embeddings)
self._documents.extend(documents)

def search(
self,
query_embedding: np.ndarray,
k: int = 5,
n_probe: int = 10, # For IVFFlat: number of clusters to search
) -> list[SearchResult]:
"""Find the k most similar documents to the query.

Args:
query_embedding: 1D or 2D array of shape (dim,) or (1, dim).
k: Number of results to return.
n_probe: For approximate search, number of clusters to probe.
Higher = more accurate, slower. Default 10 is a good start.

Returns:
List of SearchResult sorted by similarity (most similar first).
"""
query = query_embedding.astype(np.float32)
if query.ndim == 1:
query = query.reshape(1, -1)

# Normalise query vector
query = query / np.linalg.norm(query, axis=1, keepdims=True)

if hasattr(self._index, "nprobe"):
self._index.nprobe = n_probe

k = min(k, len(self._documents)) # Cannot return more than we have
distances, indices = self._index.search(query, k)

results = []
for dist, idx in zip(distances[0], indices[0]):
if idx == -1: # FAISS returns -1 for empty slots
continue
results.append(SearchResult(
index=int(idx),
distance=float(1.0 - dist), # Convert IP score to distance
score=float(dist),
))

return results

def get_document(self, index: int) -> str:
"""Retrieve a document by its index."""
return self._documents[index]

@property
def size(self) -> int:
return self._index.ntotal


# Complete usage example
def demo_faiss_search():
documents = [
"Python is a high-level programming language.",
"Machine learning uses algorithms to find patterns.",
"The Eiffel Tower is in Paris, France.",
"Gradient descent optimises neural network weights.",
"The Python programming language was created by Guido van Rossum.",
"Paris is the capital city of France.",
]

embed = build_local_embedder("all-MiniLM-L6-v2")
embeddings = embed(documents)

index = FAISSIndex(dim=embed.dim, approximate=False)
index.add(documents, embeddings)

query = "What programming language was designed by Guido van Rossum?"
query_vec = embed(query)[0]

results = index.search(query_vec, k=3)
print(f"Query: {query}\n")
for result in results:
print(f" [{result.score:.3f}] {index.get_document(result.index)}")

ChromaDB is the simplest persistent vector database for Python. It handles embedding storage, metadata, and filtering without needing to run a separate server.

import chromadb
from chromadb.utils import embedding_functions
from typing import Any


class ChromaDocumentStore:
"""Persistent document store using ChromaDB.

Use this for:
- Development and prototyping (no server to run)
- Single-machine production deployments with moderate data volumes
- When you need metadata filtering alongside vector search

Limitations:
- Single machine only (not distributed)
- Performance degrades at multi-million document scale
"""

def __init__(
self,
collection_name: str,
persist_directory: str = "./chroma_db",
embedding_model: str = "all-MiniLM-L6-v2",
):
"""
Args:
collection_name: Logical name for this set of documents.
persist_directory: Path to store the database files on disk.
embedding_model: Sentence Transformers model to use for embedding.
"""
self._client = chromadb.PersistentClient(path=persist_directory)

# ChromaDB can embed documents automatically using this function
# Or you can pass pre-computed embeddings
self._ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)

self._collection = self._client.get_or_create_collection(
name=collection_name,
embedding_function=self._ef,
metadata={"hnsw:space": "cosine"}, # Use cosine distance
)

def add(
self,
documents: list[str],
metadatas: list[dict] | None = None,
ids: list[str] | None = None,
) -> None:
"""Add documents to the collection.

Args:
documents: List of text documents.
metadatas: Optional list of metadata dicts (one per document).
Metadata fields can be used to filter search results.
ids: Optional list of unique IDs. Auto-generated if not provided.
"""
if ids is None:
import hashlib
ids = [hashlib.md5(doc.encode()).hexdigest() for doc in documents]

if metadatas is None:
metadatas = [{} for _ in documents]

# ChromaDB will call the embedding function automatically
self._collection.add(
documents=documents,
metadatas=metadatas,
ids=ids,
)

def search(
self,
query: str,
k: int = 5,
where: dict | None = None, # Metadata filter
where_document: dict | None = None, # Document content filter
) -> list[dict[str, Any]]:
"""Search for documents similar to the query.

Args:
query: Search query text (will be embedded automatically).
k: Number of results to return.
where: Metadata filter dict. Example: {"source": "arxiv"}
Supports operators: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin
where_document: Filter on document content: {"$contains": "Python"}

Returns:
List of result dicts with keys: id, document, metadata, distance.
"""
query_params: dict[str, Any] = {
"query_texts": [query],
"n_results": k,
}
if where:
query_params["where"] = where
if where_document:
query_params["where_document"] = where_document

results = self._collection.query(**query_params)

# Flatten ChromaDB's batched response format
output = []
for doc, meta, dist, doc_id in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
results["ids"][0],
):
output.append({
"id": doc_id,
"document": doc,
"metadata": meta,
"distance": dist,
"score": 1.0 - dist, # Convert distance to similarity
})

return output

@property
def count(self) -> int:
return self._collection.count()


# Usage with metadata filtering
def demo_chroma_with_metadata():
store = ChromaDocumentStore("research_papers")

documents = [
"Attention is all you need: the Transformer architecture.",
"BERT: Pre-training of Deep Bidirectional Transformers.",
"GPT-3: Language models are few-shot learners.",
"FAISS: A library for efficient similarity search.",
"pgvector: Open-source vector similarity search for Postgres.",
]
metadatas = [
{"category": "nlp", "year": 2017, "citations": 100000},
{"category": "nlp", "year": 2018, "citations": 50000},
{"category": "nlp", "year": 2020, "citations": 30000},
{"category": "vector-search", "year": 2019, "citations": 5000},
{"category": "vector-search", "year": 2021, "citations": 1000},
]

store.add(documents, metadatas)

# Search all documents
print("All results:")
for r in store.search("transformer architecture for NLP", k=3):
print(f" [{r['score']:.3f}] {r['document'][:60]}")

# Search only vector-search papers
print("\nVector search papers only:")
for r in store.search(
"similarity search at scale",
k=3,
where={"category": {"$eq": "vector-search"}},
):
print(f" [{r['score']:.3f}] {r['document'][:60]}")

# Search papers after 2018
print("\nPapers after 2018:")
for r in store.search("language model pretraining", k=3, where={"year": {"$gt": 2018}}):
print(f" [{r['score']:.3f}] ({r['metadata']['year']}) {r['document'][:60]}")

Pinecone is a fully managed vector database. Use it when you need:

  • More than a few million vectors
  • Multiple readers/writers across distributed services
  • Automatic scaling without managing infrastructure
  • Low-latency queries from multiple geographic regions
from pinecone import Pinecone, ServerlessSpec
import numpy as np
from typing import Any


class PineconeDocumentStore:
"""Production vector store using Pinecone Serverless.

Pinecone concepts:
- Index: Like a database. One index per application or use case.
- Namespace: Logical partition within an index. Use to separate tenants,
content types, or versions without creating separate indices.
- Vector: ID + embedding + optional metadata (up to 40KB).
- Upsert: Insert-or-update by ID. If ID exists, it is overwritten.
"""

def __init__(
self,
api_key: str,
index_name: str,
dim: int,
region: str = "us-east-1",
):
self._pc = Pinecone(api_key=api_key)

# Create the index if it doesn't exist
existing = [idx.name for idx in self._pc.list_indexes()]
if index_name not in existing:
self._pc.create_index(
name=index_name,
dimension=dim,
metric="cosine", # or "dotproduct" for pre-normalised vectors
spec=ServerlessSpec(cloud="aws", region=region),
)
# Wait for index to be ready
import time
while not self._pc.describe_index(index_name).status["ready"]:
time.sleep(1)

self._index = self._pc.Index(index_name)

def upsert(
self,
ids: list[str],
embeddings: np.ndarray,
metadatas: list[dict] | None = None,
namespace: str = "",
batch_size: int = 100,
) -> None:
"""Upsert vectors into the index.

Args:
ids: Unique string IDs for each vector.
embeddings: 2D float array of shape (n_vectors, dim).
metadatas: Optional metadata dicts. Metadata is returned in query
results and can be used for filtering.
namespace: Optional namespace to partition vectors.
batch_size: Vectors per upsert request (Pinecone limit: 100).
"""
if metadatas is None:
metadatas = [{} for _ in ids]

vectors = embeddings.astype(np.float64).tolist() # Pinecone expects Python floats

# Pinecone accepts max 100 vectors per upsert call
for i in range(0, len(ids), batch_size):
batch = [
{
"id": ids[j],
"values": vectors[j],
"metadata": metadatas[j],
}
for j in range(i, min(i + batch_size, len(ids)))
]
self._index.upsert(vectors=batch, namespace=namespace)

def query(
self,
query_embedding: np.ndarray,
k: int = 5,
filter: dict | None = None,
namespace: str = "",
include_metadata: bool = True,
) -> list[dict[str, Any]]:
"""Query for the k nearest vectors.

Args:
query_embedding: 1D array of shape (dim,).
k: Number of results.
filter: Metadata filter. Pinecone supports $eq, $ne, $in, $nin,
$gt, $gte, $lt, $lte, $and, $or.
Example: {"category": {"$eq": "news"}, "date": {"$gte": "2025-01-01"}}
namespace: Namespace to search within.
include_metadata: Whether to return metadata with results.

Returns:
List of result dicts: {id, score, metadata}.
"""
query_list = query_embedding.astype(np.float64).tolist()

response = self._index.query(
vector=query_list,
top_k=k,
filter=filter,
namespace=namespace,
include_metadata=include_metadata,
include_values=False, # Do not return the full embedding vector
)

return [
{
"id": match.id,
"score": match.score,
"metadata": match.metadata or {},
}
for match in response.matches
]

def delete(self, ids: list[str], namespace: str = "") -> None:
"""Delete vectors by ID."""
self._index.delete(ids=ids, namespace=namespace)

@property
def stats(self) -> dict:
"""Return index statistics."""
return self._index.describe_index_stats()

pgvector adds vector similarity search to PostgreSQL. Use it when:

  • You already use Postgres and want to avoid a separate vector database
  • You need transactional consistency between vector updates and related data
  • Your vector counts are moderate (up to a few million)
  • You need JOIN operations between vectors and relational data
import asyncpg
import numpy as np
import json


class PgVectorStore:
"""Vector search in PostgreSQL using pgvector extension.

pgvector operations:
- <-> L2 distance (default, lower = more similar)
- <#> Inner product (use -1*<#> for dot product similarity)
- <=> Cosine distance (1 - cosine_similarity, lower = more similar)

Index types:
- ivfflat: Approximate, fast to build, decent recall. Good for up to 1M vectors.
- hnsw: Approximate, better recall than ivfflat, more memory. Better for queries.
"""

def __init__(self, pool: asyncpg.Pool, table_name: str = "documents"):
self._pool = pool
self._table = table_name

async def setup(self, dim: int) -> None:
"""Create the documents table and vector index if they don't exist."""
async with self._pool.acquire() as conn:
# Enable the pgvector extension
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

# Create the documents table
await conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self._table} (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding vector({dim}) NOT NULL,
metadata JSONB DEFAULT '{{}}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now()
)
""")

# Create an HNSW index for fast approximate cosine similarity search
# m = max connections per node (higher = better recall, more memory)
# ef_construction = search depth during build (higher = better quality)
await conn.execute(f"""
CREATE INDEX IF NOT EXISTS {self._table}_embedding_idx
ON {self._table}
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")

async def upsert(
self,
id: str,
content: str,
embedding: np.ndarray,
metadata: dict | None = None,
) -> None:
"""Insert or update a document."""
# pgvector accepts embeddings as a string like "[0.1,0.2,0.3]"
embedding_str = "[" + ",".join(f"{v:.8f}" for v in embedding.tolist()) + "]"

async with self._pool.acquire() as conn:
await conn.execute(f"""
INSERT INTO {self._table} (id, content, embedding, metadata)
VALUES ($1, $2, $3::vector, $4::jsonb)
ON CONFLICT (id) DO UPDATE
SET content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata
""", id, content, embedding_str, json.dumps(metadata or {}))

async def search(
self,
query_embedding: np.ndarray,
k: int = 5,
metadata_filter: str = "", # Raw SQL WHERE clause fragment
filter_params: list | None = None,
) -> list[dict]:
"""Cosine similarity search with optional metadata filtering.

Args:
query_embedding: Query vector.
k: Number of results.
metadata_filter: SQL WHERE clause for metadata filtering.
Example: "metadata->>'category' = $3"
filter_params: Parameters for the metadata_filter clause.

Returns:
List of result dicts: {id, content, metadata, similarity}.
"""
embedding_str = "[" + ",".join(f"{v:.8f}" for v in query_embedding.tolist()) + "]"

where_clause = f"WHERE {metadata_filter}" if metadata_filter else ""
params_list = filter_params or []

# Set ef_search for query-time accuracy control
# Higher ef_search = better recall, slower query
# Set per-connection using SET LOCAL inside a transaction
async with self._pool.acquire() as conn:
async with conn.transaction():
await conn.execute("SET LOCAL hnsw.ef_search = 100")

rows = await conn.fetch(f"""
SELECT
id,
content,
metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM {self._table}
{where_clause}
ORDER BY embedding <=> $1::vector
LIMIT $2
""", embedding_str, k, *params_list)

return [
{
"id": row["id"],
"content": row["content"],
"metadata": json.loads(row["metadata"]),
"similarity": float(row["similarity"]),
}
for row in rows
]

When to Use pgvector vs a Dedicated Vector DB

pgvector is the right choice when:
- Your data is already in Postgres and you want ACID consistency
- You need to JOIN embeddings with other tables in a single query
- Vector counts are under ~5 million
- You want to avoid operational complexity of a second database

Pinecone/Weaviate/Qdrant are the right choice when:
- Vector counts exceed 10 million
- You need sub-10ms query latency at high concurrency
- You have multi-region requirements
- Your team lacks Postgres expertise for tuning and operations

Part 7 -- Building a Complete RAG Pipeline

import openai
import numpy as np
from dataclasses import dataclass, field
from typing import Any


@dataclass
class Document:
"""A chunk of text with associated metadata."""
id: str
content: str
metadata: dict[str, Any] = field(default_factory=dict)
embedding: np.ndarray | None = None


class RAGPipeline:
"""A complete retrieval-augmented generation pipeline.

Covers the full cycle: ingest documents, retrieve relevant chunks
for a query, and generate a grounded answer.

In production, the ingestion phase (chunk, embed, store) runs offline
or incrementally as new documents arrive. The retrieval + generation
phase runs per-request on the hot path.
"""

def __init__(
self,
vector_store: ChromaDocumentStore,
embed_fn: callable,
llm_client: openai.OpenAI = None,
llm_model: str = "gpt-4o",
):
self._store = vector_store
self._embed = embed_fn
self._llm = llm_client or openai.OpenAI()
self._llm_model = llm_model

# ------------------------------------------------------------------ Ingestion

def ingest(
self,
documents: list[str],
metadatas: list[dict] | None = None,
chunk_size: int = 500,
chunk_overlap: int = 50,
) -> int:
"""Chunk, embed, and store a list of documents.

Returns:
Total number of chunks stored.
"""
all_chunks: list[str] = []
all_metas: list[dict] = []

for i, doc in enumerate(documents):
meta = metadatas[i] if metadatas else {}
chunks = list(self._chunk_text(doc, chunk_size, chunk_overlap))
all_chunks.extend(chunks)
all_metas.extend([{**meta, "chunk_index": j} for j in range(len(chunks))])

if not all_chunks:
return 0

# Generate IDs for deduplication
import hashlib
ids = [hashlib.sha256(c.encode()).hexdigest()[:16] for c in all_chunks]

self._store.add(all_chunks, all_metas, ids)
return len(all_chunks)

def _chunk_text(
self,
text: str,
chunk_size: int,
overlap: int,
):
"""Simple word-count based chunking with overlap."""
words = text.split()
step = chunk_size - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i: i + chunk_size])
if chunk.strip():
yield chunk
if i + chunk_size >= len(words):
break

# ------------------------------------------------------------------ Retrieval

def retrieve(
self,
query: str,
k: int = 5,
metadata_filter: dict | None = None,
) -> list[dict]:
"""Retrieve the most relevant chunks for a query.

Args:
query: User's question or search string.
k: Number of chunks to retrieve.
metadata_filter: ChromaDB 'where' filter dict.

Returns:
List of result dicts sorted by relevance (highest first).
"""
return self._store.search(
query=query,
k=k,
where=metadata_filter,
)

# ------------------------------------------------------------------ Generation

def answer(
self,
question: str,
k: int = 5,
metadata_filter: dict | None = None,
system_prompt: str = "",
) -> dict[str, Any]:
"""Retrieve relevant chunks and generate a grounded answer.

Returns:
{
"answer": str,
"sources": list[dict], # Retrieved chunks used as context
"num_chunks_used": int,
}
"""
retrieved = self.retrieve(question, k=k, metadata_filter=metadata_filter)

if not retrieved:
return {
"answer": "I could not find relevant information to answer this question.",
"sources": [],
"num_chunks_used": 0,
}

# Build context from retrieved chunks
context_parts = []
for i, chunk in enumerate(retrieved):
context_parts.append(f"[Source {i+1}]\n{chunk['document']}")
context = "\n\n".join(context_parts)

default_system = (
"You are a helpful assistant. Answer questions using ONLY the provided context. "
"If the context does not contain enough information to answer, say so clearly. "
"Cite the source number (e.g. [Source 1]) when using information from context."
)

response = self._llm.chat.completions.create(
model=self._llm_model,
messages=[
{"role": "system", "content": system_prompt or default_system},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}",
},
],
max_tokens=1_000,
)

return {
"answer": response.choices[0].message.content,
"sources": retrieved,
"num_chunks_used": len(retrieved),
}

Part 8 -- Chunking Strategies

The chunking strategy directly affects retrieval quality. Different strategies work better for different content types.

import re
from typing import Iterator


def chunk_fixed_size(text: str, size: int = 500, overlap: int = 50) -> Iterator[str]:
"""Fixed word-count chunks with overlap.

Fastest. Ignores sentence or paragraph boundaries.
Good for: unstructured text, logs, code.
Bad for: structured documents where semantics depend on section context.
"""
words = text.split()
step = size - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i: i + size])
if chunk.strip():
yield chunk


def chunk_sentences(text: str, max_sentences: int = 5, overlap: int = 1) -> Iterator[str]:
"""Chunks by sentence boundaries.

Preserves sentence integrity -- no sentence is split mid-way.
Chunk sizes vary based on sentence length.
Good for: prose, news articles, academic papers.
"""
# Simple sentence splitter -- in production use spacy or nltk
sentence_endings = re.compile(r"(?<=[.!?])\s+")
sentences = sentence_endings.split(text.strip())
sentences = [s.strip() for s in sentences if s.strip()]

step = max_sentences - overlap
for i in range(0, len(sentences), step):
chunk = " ".join(sentences[i: i + max_sentences])
if chunk:
yield chunk


def chunk_by_paragraph(text: str, max_chars: int = 1500) -> Iterator[str]:
"""Chunks at paragraph boundaries.

A paragraph is the natural unit of thought in most documents.
Paragraphs that exceed max_chars are split by sentence.
Good for: markdown documents, articles, documentation.
"""
paragraphs = re.split(r"\n\s*\n", text.strip())

current_chunk = []
current_len = 0

for para in paragraphs:
para = para.strip()
if not para:
continue

if current_len + len(para) > max_chars and current_chunk:
yield "\n\n".join(current_chunk)
current_chunk = []
current_len = 0

if len(para) > max_chars:
# Oversized paragraph -- split by sentences
yield from chunk_sentences(para, max_sentences=4)
else:
current_chunk.append(para)
current_len += len(para)

if current_chunk:
yield "\n\n".join(current_chunk)


def chunk_recursive(
text: str,
chunk_size: int = 1000,
overlap: int = 100,
separators: list[str] | None = None,
) -> Iterator[str]:
"""Recursive character text splitter (LangChain-style).

Tries separators in order of preference: double newline, single newline,
sentence boundary, word boundary, character. Splits at the highest-level
separator that produces chunks within the size limit.

This is the most generally useful strategy for mixed content.
Good for: code files, markdown, mixed-format documents.
"""
if separators is None:
separators = ["\n\n", "\n", ". ", " ", ""]

def _split(text: str, separator_idx: int) -> list[str]:
if separator_idx >= len(separators):
# No separator worked -- hard split by character count
return [text[i: i + chunk_size] for i in range(0, len(text), chunk_size - overlap)]

sep = separators[separator_idx]
splits = text.split(sep) if sep else list(text)

chunks = []
current = []
current_len = 0

for split in splits:
split_with_sep = split + sep if sep else split
part_len = len(split_with_sep)

if current_len + part_len > chunk_size and current:
# Current chunk is full -- finalise it
chunk_text = sep.join(current)
if len(chunk_text) > chunk_size:
# Still too large -- recurse with next separator
chunks.extend(_split(chunk_text, separator_idx + 1))
else:
chunks.append(chunk_text)

# Keep overlap: retain the last few splits in the new chunk
overlap_start = max(0, len(current) - 2)
current = current[overlap_start:]
current_len = sum(len(s) for s in current)

current.append(split)
current_len += part_len

if current:
final_chunk = sep.join(current)
if len(final_chunk) > chunk_size:
chunks.extend(_split(final_chunk, separator_idx + 1))
else:
chunks.append(final_chunk)

return chunks

for chunk in _split(text, 0):
if chunk.strip():
yield chunk.strip()

Part 9 -- Reranking Retrieved Results

Vector retrieval is fast but approximate. A cross-encoder reranker evaluates each candidate more carefully and reorders the results. The pattern: retrieve 20 candidates with the fast vector search, then rerank to pick the best 5.

from sentence_transformers import CrossEncoder
import numpy as np


class CrossEncoderReranker:
"""Reranks retrieved documents using a cross-encoder model.

A cross-encoder takes a (query, document) pair as input and outputs
a single relevance score. This is more accurate than bi-encoder cosine
similarity because the query and document attend to each other.

Cost: O(n_candidates * sequence_length) -- much slower than vector search.
This is why reranking is applied to a small candidate set (20-50),
not the full corpus.

Recommended models:
- cross-encoder/ms-marco-MiniLM-L-6-v2: fast, good quality
- cross-encoder/ms-marco-electra-base: slower, better quality
"""

def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self._model = CrossEncoder(model_name)

def rerank(
self,
query: str,
documents: list[str],
top_k: int | None = None,
) -> list[tuple[int, float, str]]:
"""Rerank documents by relevance to the query.

Args:
query: The user's query.
documents: List of candidate document strings.
top_k: Return only the top k results. None returns all, sorted.

Returns:
List of (original_index, score, document) tuples, sorted by score descending.
"""
if not documents:
return []

# Create (query, document) pairs for the cross-encoder
pairs = [(query, doc) for doc in documents]
scores = self._model.predict(pairs) # Returns a list of float scores

# Sort by score descending
ranked = sorted(
enumerate(zip(scores, documents)),
key=lambda x: x[1][0],
reverse=True,
)

results = [(idx, float(score), doc) for idx, (score, doc) in ranked]
return results[:top_k] if top_k is not None else results


def retrieve_and_rerank(
query: str,
store: ChromaDocumentStore,
reranker: CrossEncoderReranker,
initial_k: int = 20,
final_k: int = 5,
) -> list[dict]:
"""Two-stage retrieval: vector search + cross-encoder reranking.

Stage 1: Vector search retrieves initial_k candidates quickly.
Stage 2: Cross-encoder reranks candidates precisely, returns final_k.

The gap between initial_k and final_k is the reranking budget.
Larger gaps improve recall but increase reranking cost.
"""
# Stage 1: fast retrieval
candidates = store.search(query, k=initial_k)

# Stage 2: precise reranking
doc_texts = [c["document"] for c in candidates]
reranked = reranker.rerank(query, doc_texts, top_k=final_k)

# Map back to the original candidate dicts with updated scores
results = []
for original_idx, new_score, doc_text in reranked:
candidate = candidates[original_idx].copy()
candidate["rerank_score"] = new_score
candidate["vector_score"] = candidate.pop("score")
results.append(candidate)

return results

Part 10 -- Hybrid Search: BM25 + Vector

Vector search finds semantically similar content. BM25 (the algorithm behind most full-text search engines) finds exact keyword matches. Combining both with Reciprocal Rank Fusion (RRF) typically outperforms either alone.

from rank_bm25 import BM25Okapi
import re
from typing import Any


class HybridSearchEngine:
"""Combines BM25 keyword search with vector search using Reciprocal Rank Fusion.

RRF formula: score(d) = sum(1 / (k + rank_in_list_i))
where k is a constant (typically 60) that moderates the impact of top ranks.

The key property of RRF: it is rank-based, not score-based. You do not
need to normalise or calibrate the scores from BM25 and vector search --
only the ranks matter.
"""

def __init__(
self,
vector_store: ChromaDocumentStore,
rrf_k: int = 60,
):
self._vector_store = vector_store
self._rrf_k = rrf_k
self._bm25: BM25Okapi | None = None
self._bm25_docs: list[str] = []

def _tokenise(self, text: str) -> list[str]:
"""Simple tokeniser: lowercase, split on non-alphanumeric."""
return re.findall(r"[a-z0-9]+", text.lower())

def build_bm25_index(self, documents: list[str]) -> None:
"""Build or rebuild the BM25 index over the document corpus."""
self._bm25_docs = documents
tokenised = [self._tokenise(doc) for doc in documents]
self._bm25 = BM25Okapi(tokenised)

def search(
self,
query: str,
k: int = 5,
vector_k: int = 20, # Retrieve more from each source before fusing
bm25_k: int = 20,
) -> list[dict[str, Any]]:
"""Hybrid search: BM25 + vector + RRF.

Args:
query: Search query.
k: Final number of results to return after fusion.
vector_k: Top-k to retrieve from vector search before fusion.
bm25_k: Top-k to retrieve from BM25 before fusion.

Returns:
Fused and reranked results.
"""
# ----- BM25 results -----
if self._bm25 is None:
bm25_results = []
else:
tokenised_query = self._tokenise(query)
bm25_scores = self._bm25.get_scores(tokenised_query)
bm25_top_indices = np.argsort(bm25_scores)[::-1][:bm25_k]
bm25_results = [
{"id": f"bm25_{i}", "document": self._bm25_docs[i], "bm25_rank": rank}
for rank, i in enumerate(bm25_top_indices)
]

# ----- Vector results -----
vector_results = self._vector_store.search(query, k=vector_k)
for rank, result in enumerate(vector_results):
result["vector_rank"] = rank

# ----- Reciprocal Rank Fusion -----
# Build a map from document content to fused scores
doc_scores: dict[str, dict] = {}

for rank, result in enumerate(bm25_results):
doc = result["document"]
if doc not in doc_scores:
doc_scores[doc] = {"rrf_score": 0.0, "document": doc, "metadata": {}}
doc_scores[doc]["rrf_score"] += 1.0 / (self._rrf_k + rank + 1)

for rank, result in enumerate(vector_results):
doc = result["document"]
if doc not in doc_scores:
doc_scores[doc] = {"rrf_score": 0.0, "document": doc, "metadata": result.get("metadata", {})}
doc_scores[doc]["rrf_score"] += 1.0 / (self._rrf_k + rank + 1)

# Sort by fused score and return top-k
fused = sorted(doc_scores.values(), key=lambda x: x["rrf_score"], reverse=True)
return fused[:k]

Part 11 -- Production Considerations

Embedding Drift

Embedding models are updated. When text-embedding-3-small is replaced by a future model, old embeddings in your database are incompatible with new query embeddings.

import hashlib
from datetime import datetime


class EmbeddingVersionTracker:
"""Tracks embedding model versions to detect and handle drift.

When you change embedding models, you must re-embed the entire corpus.
This class helps you track which documents need re-embedding.
"""

def __init__(self):
# In production, store this in your database alongside the embeddings
self._document_versions: dict[str, dict] = {}

def compute_fingerprint(self, model_name: str, model_version: str) -> str:
"""Compute a fingerprint for a model+version combination."""
return hashlib.sha256(f"{model_name}:{model_version}".encode()).hexdigest()[:16]

def record_embedding(self, doc_id: str, model_name: str, model_version: str) -> None:
"""Record that a document was embedded with a specific model version."""
self._document_versions[doc_id] = {
"model_name": model_name,
"model_version": model_version,
"fingerprint": self.compute_fingerprint(model_name, model_version),
"embedded_at": datetime.now().isoformat(),
}

def needs_reembedding(
self,
doc_id: str,
current_model: str,
current_version: str,
) -> bool:
"""Return True if the document needs to be re-embedded."""
if doc_id not in self._document_versions:
return True # Never embedded

current_fp = self.compute_fingerprint(current_model, current_version)
stored_fp = self._document_versions[doc_id]["fingerprint"]
return current_fp != stored_fp

def stale_documents(
self,
all_doc_ids: list[str],
current_model: str,
current_version: str,
) -> list[str]:
"""Return list of document IDs that need re-embedding."""
return [
doc_id for doc_id in all_doc_ids
if self.needs_reembedding(doc_id, current_model, current_version)
]

Index Update Strategy

# Two strategies for keeping your index current:

# Strategy 1: Batch rebuild (simplest, safest)
# - Suitable for datasets that update daily or weekly
# - Build a new index in parallel, then swap atomically
# - Zero downtime: old index serves queries until new one is ready

def rebuild_index_in_background(
documents: list[Document],
embed_fn: callable,
new_store_path: str,
) -> ChromaDocumentStore:
"""Build a fresh index and return it when ready.

Old index continues serving until this returns.
Caller atomically replaces the reference.
"""
new_store = ChromaDocumentStore(
collection_name="documents_v2",
persist_directory=new_store_path,
)

texts = [d.content for d in documents]
metas = [d.metadata for d in documents]

new_store.add(texts, metas)
return new_store


# Strategy 2: Incremental upsert (more complex, real-time)
# - Suitable for data that changes continuously
# - Track document modification timestamps
# - Only re-embed documents that have changed since last index update
# - Use the upsert operation -- existing IDs are replaced, new IDs are added

def incremental_update(
changed_documents: list[Document],
store: ChromaDocumentStore,
deleted_ids: list[str] | None = None,
) -> None:
"""Update the index with changed documents only."""
if deleted_ids:
store._collection.delete(ids=deleted_ids)

if changed_documents:
texts = [d.content for d in changed_documents]
metas = [d.metadata for d in changed_documents]
ids = [d.id for d in changed_documents]
store.add(texts, metas, ids)

Key Takeaways

  • Embeddings encode meaning as geometry: semantically similar texts map to nearby points in the vector space. The angle between vectors (cosine similarity) measures semantic similarity; magnitude is irrelevant.
  • Use dot product for speed when vectors are unit-normalised: for unit vectors, dot product and cosine similarity are mathematically equivalent. Most embedding models output unit-normalised vectors -- confirm before choosing your similarity metric.
  • Choose your vector store by scale and operational complexity: ChromaDB for development and single-machine production; pgvector when you already use Postgres and want transactional consistency; Pinecone/Weaviate/Qdrant for multi-million vector production deployments with multi-region needs.
  • FAISS IndexFlatIP is exact; IndexIVFFlat is approximate: for datasets under 100K vectors, use exact search. For larger datasets, approximate search with n_probe tuning is the right trade-off.
  • Chunk strategy matters as much as embedding model: fixed-size chunking is fast but crude. Sentence and paragraph-based chunking respects semantic boundaries. Recursive splitting adapts to content structure.
  • Two-stage retrieval (vector + rerank) outperforms vector alone: retrieve 20 candidates cheaply with vector search, then rerank precisely with a cross-encoder. The quality gain is large; the cost increase is small.
  • Hybrid search (BM25 + vector + RRF) typically beats pure vector search: BM25 excels at exact keyword matching; vector search excels at semantic similarity. RRF combines both without score normalisation.
  • Plan for embedding drift before day one: when you change embedding models, you must re-embed the entire corpus. Track model versions per document so you can identify what needs re-processing.

Practice Problems

Problem 1: Similarity Comparison

Generate embeddings for these 8 texts using the local Sentence Transformers model (all-MiniLM-L6-v2):

  1. "Python is a high-level programming language."
  2. "Python programming was invented by Guido van Rossum."
  3. "Snakes are reptiles that can be found on every continent except Antarctica."
  4. "JavaScript is used for web development."
  5. "The Anaconda distribution packages Python for data science."
  6. "Machine learning uses Python extensively."
  7. "The Amazon jungle is home to many species."
  8. "Flask and Django are Python web frameworks."

Compute the full 8x8 cosine similarity matrix. Which pairs have the highest similarity? Does the result match your intuition about semantic similarity? Identify any surprising results and explain them based on what you know about how embedding models work.

Problem 2: Chunking Comparison

Take a multi-page technical document (at least 3000 words -- use any article you can find or generate). Apply all four chunking strategies from Part 8 with equivalent size parameters. For each strategy:

  1. Count the number of chunks produced
  2. Measure the token count distribution (min, max, mean, std)
  3. Pick a representative question that requires reasoning across sections
  4. Retrieve the top-3 chunks for that question using FAISS
  5. Rate the quality of the retrieved chunks (do they actually contain the answer?)

Which strategy produces the most retrievable chunks for your test question?

Problem 3: FAISS Exact vs Approximate

Build a FAISS index with 50,000 synthetic embeddings (use np.random.randn(50000, 384).astype(np.float32), normalise them). Build both an IndexFlatIP and an IndexIVFFlat(n_clusters=100). For 100 random queries:

  1. Record the exact nearest neighbours from IndexFlatIP
  2. Record the approximate nearest neighbours from IndexIVFFlat at n_probe=1, 10, 50
  3. Compute recall@10: for each query, what fraction of the exact top-10 appear in the approximate top-10?
  4. Measure query latency for each configuration

Plot the recall vs latency trade-off. What n_probe value gives >90% recall?

Problem 4: RAG Evaluation

Build a small RAG pipeline using the RAGPipeline class from Part 7. Ingest 10 Wikipedia articles on different topics. Create a test set of 20 questions where you know the correct answer and which article it comes from.

Evaluate your pipeline on:

  • Retrieval recall@k: does the correct source document appear in the top-k retrieved chunks?
  • Answer correctness: does the LLM's generated answer correctly answer the question?

Try these variations and measure how they affect both metrics:

  1. Chunk size: 200 words vs 500 words vs 1000 words
  2. Retrieval k: 3 vs 5 vs 10
  3. With and without cross-encoder reranking (top-20 retrieve, rerank to top-5)

Report your findings in a table.

Problem 5: Production Index Manager

Build an IndexManager class that manages the full lifecycle of a vector index for a document collection that updates daily. It should:

  • Accept new documents and upserted documents via update(documents: list[Document])
  • Accept deleted document IDs via delete(ids: list[str])
  • Track which documents were embedded with which model version
  • Automatically detect when a model change requires a full re-embed
  • Support a needs_full_rebuild() method that returns True if more than 20% of documents are stale
  • Have a rebuild(new_embed_fn, new_model_name, new_model_version) method that rebuilds the index in a background thread and atomically swaps to the new index when ready

Write tests that verify: (1) incremental updates are reflected in search results, (2) full rebuild preserves all documents, (3) model version changes are correctly detected.

© 2026 EngineersOfAI. All rights reserved.